package org.example.infrastructure.persistent.repository;

import cn.bugstack.middleware.db.router.strategy.IDBRouterStrategy;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.example.domain.credit.model.aggregate.TradeAggregate;
import org.example.domain.credit.model.entity.CreditAccountEntity;
import org.example.domain.credit.model.entity.CreditOrderEntity;
import org.example.domain.credit.model.valobj.AccountStatusVO;
import org.example.domain.credit.model.valobj.TradeTypeVO;
import org.example.domain.credit.repository.ICreditRepository;
import org.example.domain.task.model.entity.TaskEntity;
import org.example.infrastructure.event.EventPublisher;
import org.example.infrastructure.persistent.dao.ITaskDao;
import org.example.infrastructure.persistent.dao.IUserCreditAccountDao;
import org.example.infrastructure.persistent.dao.IUserCreditOrderDao;
import org.example.infrastructure.persistent.po.Task;
import org.example.infrastructure.persistent.po.UserCreditAccount;
import org.example.infrastructure.persistent.po.UserCreditOrder;
import org.example.infrastructure.persistent.redis.IRedisService;
import org.example.types.common.Constants;
import org.redisson.api.RLock;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.support.TransactionTemplate;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Slf4j
@Repository
public class CreditRepository implements ICreditRepository {

    @Resource
    private IRedisService redisService;

    @Resource
    private IDBRouterStrategy dbRouter;

    @Resource
    private IUserCreditOrderDao userCreditOrderDao;

    @Resource
    private ITaskDao taskDao;

    @Resource
    private IUserCreditAccountDao userCreditAccountDao;

    @Resource
    private TransactionTemplate transactionTemplate;

    @Resource
    private EventPublisher eventPublisher;

    /**
     * 保存积分调额订单
     * 积分调额入账
     * 没有积分账户则添加,这里涉及多线程,查询有没有账户和添加账户两个不是原子操作
     * 如果线程 A 和 B 都判断了不存在账户 , A先进行了新增,B因为也判断了不存在所以也新增,这时就会出错,因此需要加分布式锁,锁住这两个操作
     *
     * 发送积分扣减成功消息,去让活动额度入账
     * @param tradeAggregate
     */
    @Override
    public void saveUserCreditTradeOrder(TradeAggregate tradeAggregate) {
        CreditOrderEntity creditOrderEntity = tradeAggregate.getCreditOrderEntity();
        String userId = tradeAggregate.getUserId();
        CreditAccountEntity creditAccountEntity = tradeAggregate.getCreditAccountEntity();
        TaskEntity taskEntity = tradeAggregate.getTask();

        UserCreditAccount userCreditAccountReq = UserCreditAccount.builder()
                .userId(creditAccountEntity.getUserId())
                .totalAmount(creditAccountEntity.getTotalAmount())
                .availableAmount(creditAccountEntity.getAvailableAmount())
                .accountStatus(AccountStatusVO.open.getCode())
                .build();

        UserCreditOrder userCreditOrder = new UserCreditOrder();
        userCreditOrder.setUserId(creditOrderEntity.getUserId());
        userCreditOrder.setOrderId(creditOrderEntity.getOrderId());
        userCreditOrder.setTradeName(creditOrderEntity.getTradeName().getName());
        userCreditOrder.setTradeType(creditOrderEntity.getTradeType().getCode());
        userCreditOrder.setTradeAmount(creditOrderEntity.getTradeAmount());
        userCreditOrder.setOutBusinessNo(creditOrderEntity.getOutBusinessNo());

        Task task = new Task();
        task.setUserId(taskEntity.getUserId());
        task.setTopic(taskEntity.getTopic());
        task.setMessageId(taskEntity.getMessageId());
        task.setMessage(taskEntity.getMessage());
        task.setState(taskEntity.getState().getCode());

        RLock lock = redisService.getLock(Constants.RedisKey.USER_CREDIT_ACCOUNT_LOCK + userId + Constants.UNDERLINE + creditOrderEntity.getOutBusinessNo());
        try{
            transactionTemplate.execute(status -> {
                lock.lock(3, TimeUnit.SECONDS);
                dbRouter.doRouter(userId);
                try {
                    int updatedRow = userCreditAccountDao.updateAddAmount(userCreditAccountReq);
                    if (updatedRow != 1)
                        userCreditAccountDao.insert(userCreditAccountReq);

                    userCreditOrderDao.insert(userCreditOrder);

                } catch (DuplicateKeyException e) {
                    status.setRollbackOnly();
                    log.error("调整账户积分额度异常，唯一索引冲突 userId:{} orderId:{}", userId, creditOrderEntity.getOrderId(), e);
                }catch (Exception e) {
                    status.setRollbackOnly();
                    log.error("调整账户积分额度失败 userId:{} orderId:{}", userId, creditOrderEntity.getOrderId(), e);
                    throw new RuntimeException(e);
                } finally {
                    dbRouter.clear();
                }
                return 1;
            });
        } finally {
            dbRouter.clear();
            lock.unlock();
        }


        try {
            // 发送消息【在事务外执行，如果失败还有任务补偿】
            eventPublisher.publish(task.getTopic(), task.getMessage());
            // 更新数据库记录，task 任务表
            taskDao.updateTaskSendMessageCompleted(task);
            log.info("调整账户积分记录，发送MQ消息完成 userId: {} orderId:{} topic: {}", userId, creditOrderEntity.getOrderId(), task.getTopic());
        } catch (Exception e) {
            log.error("调整账户积分记录，发送MQ消息失败 userId: {} topic: {}", userId, task.getTopic());
            taskDao.updateTaskSendMessageFail(task);
        }
    }
}
